KAFKA-20287 : Fix CF handle leaks#21751
Conversation
| throw new ProcessorStateException(fatalMessage, fatal); | ||
| } catch (final RocksDBException e) { | ||
| throw new ProcessorStateException("Error opening store " + name, e); | ||
| openRocksDB(dbOptions, columnFamilyOptions); |
There was a problem hiding this comment.
This method internally calls createColumnFamilies() or mergeColumnFamilyHandleLists() which might fail
There was a problem hiding this comment.
I am wondering if we should do the necessary exception-handling inside openRocksDB instead?
There was a problem hiding this comment.
I see openRocksDB() already handles its own cleanup (CF handles + db), but the closeNativeResources() in openDB() covers a different failure scope that openRocksDB() can't handle.
These 6 native resources (userSpecifiedOptions, cache, filter, wOptions, fOptions, statistics) are all created in openDB() before openRocksDB() is ever called and openRocksDB() doesn't own them and has no references to clean them up.
If openRocksDB() itself fails, it will cleanup everything which is in finally block.
If openRocksDB() does not fail but cfAccessor.open() fails — now db is open. Hence closeNativeResources handles full teardown.
Other option is to move everything into openRocksDB(), but this would change the method signature across all overrides.
| success = true; | ||
| } finally { | ||
| if (!success) { | ||
| closeNativeResources(); |
There was a problem hiding this comment.
if openRocksDB() or cfAccessor.open() fail
| noTimestampColumnFamily.close(); | ||
| boolean success = false; | ||
| try { | ||
| try (final RocksIterator noTimestampsIter = db.newIterator(noTimestampColumnFamily)) { |
There was a problem hiding this comment.
updating with try-resources
| boolean success = false; | ||
| try { | ||
| // Check if default CF has data (plain store upgrade) | ||
| try (final RocksIterator defaultIter = db.newIterator(defaultCf)) { |
There was a problem hiding this comment.
also updating with try-with resources
| boolean success = false; | ||
| try { | ||
| // verify and close empty Default ColumnFamily | ||
| try (final RocksIterator defaultIter = db.newIterator(columnFamilies.get(0))) { |
There was a problem hiding this comment.
updating here as well in the same way try with resources
| noHeadersColumnFamily.close(); | ||
| boolean success = false; | ||
| try { | ||
| try (final RocksIterator noHeadersIter = db.newIterator(noHeadersColumnFamily)) { |
There was a problem hiding this comment.
updating here with try-with resources
|
@mjsax do you think this pr addresses the cf handle leaks ? Possible to review ? |
mjsax
left a comment
There was a problem hiding this comment.
Thanks for the PR. Made a first pass. -- In the meantime, we added RocksDBMigratingWindowStoreWithHeaders that we should include in this PR.
| * Used only by the error cleanup path in {@link #openDB} where some resources | ||
| * may not have been initialized yet. | ||
| */ | ||
| private void closeNativeResources() { |
There was a problem hiding this comment.
We do already have code in close() which closes all these resources -- are these new helper necessary? I would believe that the runtime ensures that we call RocksDBStore.close() if init() (which calls openDB()) fails.
There was a problem hiding this comment.
In close method, we are checking isOpen method which in turn checks open.get()
this would wait for openDB/openRocksDb() and if these fail, it would return false.
this 'false' does not close all resources in close() method.
| throw new ProcessorStateException(fatalMessage, fatal); | ||
| } catch (final RocksDBException e) { | ||
| throw new ProcessorStateException("Error opening store " + name, e); | ||
| openRocksDB(dbOptions, columnFamilyOptions); |
There was a problem hiding this comment.
I am wondering if we should do the necessary exception-handling inside openRocksDB instead?
| mergeColumnFamilyHandleLists(existingColumnFamilies, createdColumnFamilies, allDescriptors); | ||
| openSuccess = true; | ||
| return result; | ||
| } finally { |
There was a problem hiding this comment.
Why are we not use catch(Exception e) -- this allows us to drop openSuccess flag?
There was a problem hiding this comment.
Agree with removing success flag. Updated with catch block.
| for (final ColumnFamilyHandle handle : createdColumnFamilies) { | ||
| handle.close(); | ||
| } | ||
| db.close(); |
There was a problem hiding this comment.
I don't think we need to close the db -- this should happen in RocksDBStore.close() already?
There was a problem hiding this comment.
Yes, db.close() is not required again. Removed.
| .collect(Collectors.toList()); | ||
| final List<ColumnFamilyHandle> existingColumnFamilies = new ArrayList<>(existingDescriptors.size()); | ||
| final List<ColumnFamilyHandle> createdColumnFamilies = new ArrayList<>(); | ||
| db = RocksDB.open(dbOptions, absolutePath, existingDescriptors, existingColumnFamilies); |
There was a problem hiding this comment.
I think this call should also go inside the try-catch block?
There was a problem hiding this comment.
Yes. The RocksDB.open() call is now inside the try-catch block, so if createColumnFamilies() or mergeColumnFamilyHandleLists() throws, the existingColumnFamilies handles populated by RocksDB.open() are properly cleaned up.
| boolean openSuccess = false; | ||
| try { | ||
| createdColumnFamilies.addAll(db.createColumnFamilies(toCreate)); | ||
| final List<ColumnFamilyHandle> result = |
There was a problem hiding this comment.
| final List<ColumnFamilyHandle> result = | |
| final List<ColumnFamilyHandle> allColumnFamilies = |
There was a problem hiding this comment.
Now returning it inline, instead of new var
| } | ||
| } | ||
| success = true; | ||
| } finally { |
There was a problem hiding this comment.
Why not use catch(Exception) and get rid of success flag?
There was a problem hiding this comment.
Done. But also had to replace Exception with RunTimeException (spotbugs)
| } | ||
| } | ||
| success = true; | ||
| } finally { |
| } | ||
| } | ||
| success = true; | ||
| } finally { |
There was a problem hiding this comment.
Applied the above change here too.
| } | ||
| } | ||
| success = true; | ||
| } finally { |
There was a problem hiding this comment.
We should use catch(Exception) instead
There was a problem hiding this comment.
Thanks @muralibasani. Good catch.
Just needs a rebase (and former comments from Matthias)
0587c17 to
22b1dd2
Compare
|
@mjsax @aliehsaeedii Updated PR based on comments. |
| noHeadersColumnFamily.close(); | ||
| try { | ||
| // Check if DEFAULT CF has data (upgrade from old format without headers) | ||
| try (final RocksIterator noHeadersIter = db.newIterator(noHeadersColumnFamily)) { |
There was a problem hiding this comment.
moving newIterator() into try-resources
mjsax
left a comment
There was a problem hiding this comment.
Thanks. A few more cosmetics. Overall LGTM.
| setupStatistics(configs, dbOptions); | ||
| openRocksDB(dbOptions, columnFamilyOptions); | ||
| dbAccessor = new DirectDBAccessor(db, fOptions, wOptions); | ||
| boolean success = false; |
There was a problem hiding this comment.
Seems you missed this one for removal?
| throw new ProcessorStateException("Error opening store " + name, e); | ||
| } | ||
| success = true; | ||
| } finally { |
There was a problem hiding this comment.
Agree, removed.
| withHeadersColumnFamily, | ||
| HeadersBytesStore::convertToHeaderFormat, | ||
| this, | ||
| open |
| noTimestampColumnFamily, | ||
| withTimestampColumnFamily, | ||
| TimestampedBytesStore::convertToTimestampedFormat, | ||
| this, open |
There was a problem hiding this comment.
| this, open | |
| this, | |
| open |
| headersCf, | ||
| HeadersBytesStore::convertFromPlainToHeaderFormat, | ||
| this, | ||
| open |
| log.info("Opening store {} in regular mode", name); | ||
| cfAccessor = new SingleColumnFamilyAccessor(offsetsCf, withHeadersColumnFamily); | ||
| noHeadersColumnFamily.close(); | ||
| try { |
There was a problem hiding this comment.
Why do we need to nest two try-catch block? With success flag begin remove, isn't one sufficient? (Also elsewhere.)
There was a problem hiding this comment.
Indeed not needed. Removed from others too.
2f24bc1 to
230723a
Compare
Thank you. Pls take another look. |
RocksDBStore.java — openRocksDB() base method: Added finally block after RocksDB.open() to close all CF handles and db if createColumnFamilies() or mergeColumnFamilyHandleLists() fails.
RocksDBStore.java — openDB(): Added finally block with closeNativeResources() to close all partially-initialized native resources if openRocksDB() or cfAccessor.open() fails.
RocksDBTimestampedStore.java: Changed RocksIterator from manual close() to try-with-resources, and added finally block to close all CF handles if an exception occurs before the accessor takes ownership.
RocksDBMigratingSessionStoreWithHeaders.java: Same as above — try-with-resources for RocksIterator and finally block for CF handle cleanup on failure.
RocksDBTimestampedStoreWithHeaders.java — openFromDefaultStore(): Added finally block to close all CF handles if an exception occurs before the accessor takes ownership.
RocksDBTimestampedStoreWithHeaders.java — openFromTimestampedStore(): Replaced manual per-handle close() calls (which missed columnFamilies.get(3)) with a single finally block that loops over all handles on failure.
Reviewers: Matthias J. Sax matthias@confluent.io